package com.atguigu.chapter08;

import com.atguigu.Bean.AdsClickLog;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;

/**
 * @ClassName: Flink01_PV_Project
 * @Description:
 * @Author: kele
 * @Date: 2021/4/9 18:52
 * <p>
 * <p>
 * 需求：
 * 对一段时间内（比如一天内）的用户点击行为进行约束，
 * 如果对同一个广告点击超过一定限额（比如100次），
 * 应该把该用户加入黑名单并报警，此后其点击行为不应该再统计
 *
 * 解决每天的第一条数据计算到昨天的状态中
 * <p>
 * <p>
 * 来一条数据判断一下日期，如果是当前的日期，则不清除状态，如果不是当前的日期，则清除状态
 *
 **/
public class Flink05_Filter_BlackList_Project_1 {

    public static void main(String[] args) {

        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 20000);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        env.setParallelism(2);

        OutputTag<String> blackList = new OutputTag<String>("blackList", Types.STRING) {
        };

        //设置水印
        WatermarkStrategy<AdsClickLog> ws = WatermarkStrategy.<AdsClickLog>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                .withTimestampAssigner(new SerializableTimestampAssigner<AdsClickLog>() {
                    @Override
                    public long extractTimestamp(AdsClickLog element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                });


        SingleOutputStreamOperator<String> main =
            env
                    .socketTextStream("hadoop162",8888)
               // .readTextFile("in/AdClickLog.csv")
                .map(line -> {
                            String[] datas = line.split(",");
                            return new AdsClickLog(Long.valueOf(datas[0]),
                                    Long.valueOf(datas[1]),
                                    datas[2],
                                    datas[3],
                                    Long.valueOf(datas[4]));
                        }
                )
                .assignTimestampsAndWatermarks(ws)
                //按照用户和广告id分组
                .keyBy(ads -> ads.getUserId() + " " + ads.getAdsId())
                //分组之后，就是一个一个的用户对应一个广告的点击量
                .process(new KeyedProcessFunction<String, AdsClickLog, String>() {

                    private ValueState<Boolean> overState;
                    private ValueState<String> dateState;
                    private ReducingState<Long> clickState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        //点击次数
                        clickState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Long>(
                                "clickState",
                                (ReduceFunction<Long>) (value1, value2) -> value1 + value2,
                                Long.class));

                        //是否超过100次状态
                        overState = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("OverState", boolean.class));
                        dateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("dateState", String.class));

                    }

                    /**
                     *
                     * @param value
                     * @param ctx
                     * @param out
                     * @throws Exception
                     *
                     * 1、先判断第一条数据到达，到达后开始注册时间，第二天零时触发
                     * 2、到达一条数据数据加一，判断clickState超过100，超过则将数据放到黑名单，不纳入统计结果中
                     */

                @Override
                    public void processElement(AdsClickLog value,
                                               Context ctx,
                                               Collector<String> out) throws Exception {

                        String today = new SimpleDateFormat("yyyy-MM-dd").format(new Date(value.getTimestamp() * 1000));

                        //如果不是今天，清空状态
                        if(!today.equalsIgnoreCase(dateState.value())){
                            overState.clear();
                            clickState.clear();
                            dateState.clear();
                        }

                        //如果是第一条数据，更新时间状态
                        if(clickState.get() == null){
                            dateState.update(new SimpleDateFormat("yyyy-MM-dd").format(new Date(value.getTimestamp() * 1000)));
                        }


                        if (overState.value() == null) {

                            clickState.add(1L);

                            if (clickState.get() > 99) {

                                String msg = "用户: " +
                                        value.getUserId() +
                                        "对广告: " + value.getAdsId() + "" +
                                        " 的点击量是: " + clickState.get();

                                ctx.output(blackList, msg);

                                overState.update(true);

                            } else {
                                String msg = "用户: " +
                                        value.getUserId() +
                                        "对广告: " + value.getAdsId() + "" +
                                        " 的点击量是: " + clickState.get();
                                out.collect(msg);
                            }

                        }


                    }

                });

        main.print("main");
        main.getSideOutput(blackList).print("blacklist");

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
